package org.example.utils

import _root_.io.searchbox.client.{JestClient, JestClientFactory}
import _root_.io.searchbox.core._
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.indices.Flush
import org.apache.log4j.{Level, Logger}
import org.example.common.Logging
import org.example.constant.ApolloConst

import java.net.SocketTimeoutException
import java.util

/**
 * ES工具类
 */
object EsUtils extends Logging{
  Logger.getLogger("io.searchbox.client.JestClientFactory").setLevel(Level.WARN)

  //es集群地址
  var serverList = new util.LinkedHashSet[String]
  for (i <- ApolloConst.esNodes.split(",")) {
    serverList.add("http://" + i + ":" + ApolloConst.esPort)
  }
  val factory: JestClientFactory = new JestClientFactory() // 创建工厂类
  val config: HttpClientConfig = new HttpClientConfig.Builder(serverList) // 设置配置参数
    .multiThreaded(true)
    .maxTotalConnection(20)
    .connTimeout(10000)
    .readTimeout(10000)
    .build()
  factory.setHttpClientConfig(config) //给工厂类设置参数

  // 定义方法返回客户端
  def getClient(): JestClient = {
    factory.getObject
  }


  // 逐条写入
  def insertIntoEs(jestClient: JestClient, indexName: String, types: String, source: Any) = {
    val indexAction: Index = new Index.Builder(source).index(indexName).`type`(types).build()
    jestClient.execute(indexAction);
  }

  // 逐条写入
  def insertIntosEs(jestClient: JestClient, indexName: String, source: Any, id: String, indexType: String) = {
    val indexAction: Index = new Index.Builder(source).index(indexName).id(id).`type`(indexType).build()
    jestClient.execute(indexAction);
  }

  /**
    * 批量插入 对接Rdd的分区数据
    *
    * @param jestClient
    * @param indexName
    * @param indexType
    * @param sources
    * @return
    */
  def insertBulk(jestClient: JestClient, indexName: String, indexType: String, sources: Iterable[Any]) = {
    // 对数据进行匹配insertBulk
    if (sources != null && !sources.isEmpty) {

      sources.grouped(1000).foreach { x =>
        val bulkBuilder: Bulk.Builder = new Bulk.Builder().defaultIndex(indexName).defaultType(indexType)
        x.foreach {
          case (id: String, source) => { // 如果传递了Id则使用传的Id
            bulkBuilder.addAction(new Index.Builder(source).id(id).build())
          }
          case source => {
            bulkBuilder.addAction(new Index.Builder(source).build()) //没传Id则使用随机生成的
          }
        }
        jestClient.execute(bulkBuilder.build())
      }
    }
  }

  /**
    * 批量插入 对接Rdd的分区数据,防止数据量过大es连接超时
    *
    * @param indexName
    * @param indexType
    * @param sources
    * @return
    */
  def insertBulk(indexName: String, indexType: String, sources: Iterable[Any]) = {
    // 对数据进行匹配insertBulk
    if (sources != null && !sources.isEmpty) {
      sources.grouped(1000).foreach { x =>
        val bulkBuilder: Bulk.Builder = new Bulk.Builder().defaultIndex(indexName).defaultType(indexType)
        x.foreach {
          case (id: String, source) => { // 如果传递了Id则使用传的Id
            bulkBuilder.addAction(new Index.Builder(source).id(id).build())
          }
          case source => {
            bulkBuilder.addAction(new Index.Builder(source).build()) //没传Id则使用随机生成的
          }
        }
        val jestClient = getClient()
        try{
          jestClient.execute(bulkBuilder.build())
        }catch {
          case e: SocketTimeoutException =>{
            warn("插入数据超时！")
          }
        }finally {
          CommonUtils.autoCloseable(jestClient)
        }
      }
    }
  }

  /**
    * 刷新索引，使新插入的数据能立马查询
    *
    * @param jestClient
    * @param indexName
    */
  def flush(jestClient: JestClient, indexName: String): Unit = {
    try {
      val flush = new Flush.Builder().addIndex(indexName)
      jestClient.execute(flush.build())
    } catch {
      case e: SocketTimeoutException => {
        println(s"刷新索引:${indexName}失败，连接超时！")
      }
    }
  }

}
